{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Congestion Charges - Hard\n",
    "\n",
    "You may need to create views to complete these questions - but you do not have permission to create tables or views in the default schema. Your SQL commands are executed by user scott in schema gisq - you may create or drop views and tables in schema scott but not in gisq."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Loading spark-stubs, spark-hive\n",
      "Adding Hive conf dir /opt/hive/conf to classpath\n",
      "Creating SparkSession\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "SLF4J: No SLF4J providers were found.\n",
      "SLF4J: Defaulting to no-operation (NOP) logger implementation\n",
      "SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<a target=\"_blank\" href=\"http://jupyter:4040\">Spark UI</a>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$                                  \n",
       "\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.log4j.{Level, Logger}\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.types._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.functions._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36morg.apache.spark.sql.expressions.Window\n",
       "\n",
       "\u001b[39m\n",
       "\u001b[36mspark\u001b[39m: \u001b[32mSparkSession\u001b[39m = org.apache.spark.sql.SparkSession@20ca1207"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import $ivy.`org.apache.spark::spark-sql:3.4.0`\n",
    "\n",
    "import org.apache.log4j.{Level, Logger}\n",
    "Logger.getLogger(\"org\").setLevel(Level.OFF)\n",
    "\n",
    "import org.apache.spark._\n",
    "import org.apache.spark.sql._\n",
    "import org.apache.spark.sql.types._\n",
    "import org.apache.spark.sql.functions._\n",
    "import org.apache.spark.sql.expressions.Window\n",
    "\n",
    "val spark = {\n",
    "    NotebookSparkSession.builder()\n",
    "    .progress(false)\n",
    "    .appName(\"app18-3\")\n",
    "    // .master(\"spark://192.168.31.31:7077\")\n",
    "    .master(\"local[*]\")\n",
    "    .config(\"spark.sql.warehouse.dir\", \n",
    "            \"hdfs://192.168.31.31:9000/user/hive/warehouse\") \n",
    "    .config(\"spark.cores.max\", \"4\") \n",
    "    .config(\"spark.executor.instances\", \"1\") \n",
    "    .config(\"spark.executor.cores\", \"2\") \n",
    "    .config(\"spark.executor.memory\", \"10g\") \n",
    "    .config(\"spark.shuffle.service.enabled\", \"false\") \n",
    "    .config(\"spark.dynamicAllocation.enabled\", \"false\") \n",
    "    .config(\"spark.sql.catalogImplementation\", \"hive\")\n",
    "    .config(\"spark.sql.repl.eagerEval.enabled\", \"true\")\n",
    "    .config(\"spark.driver.allowMultipleContexts\", \"true\")\n",
    "    .getOrCreate()\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[32mimport \u001b[39m\u001b[36mspark.implicits._\n",
       "\u001b[39m\n",
       "\u001b[32mimport \u001b[39m\u001b[36mspark.sqlContext.implicits._\n",
       "\u001b[39m\n",
       "defined \u001b[32mfunction\u001b[39m \u001b[36msc\u001b[39m\n",
       "\u001b[36mhiveCxt\u001b[39m: \u001b[32msql\u001b[39m.\u001b[32mhive\u001b[39m.\u001b[32mHiveContext\u001b[39m = org.apache.spark.sql.hive.HiveContext@76344eb6"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import spark.implicits._\n",
    "import spark.sqlContext.implicits._\n",
    "def sc = spark.sparkContext\n",
    "val hiveCxt = new org.apache.spark.sql.hive.HiveContext(sc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defined \u001b[32mclass\u001b[39m \u001b[36mRichDF\u001b[39m"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "// Credit to Aivean\n",
    "implicit class RichDF(val ds:DataFrame) {\n",
    "    def showHTML(limit: Int = 50, truncate: Int = 100) = {\n",
    "        import xml.Utility.escape\n",
    "        val data = ds.take(limit)\n",
    "        val header = ds.schema.fieldNames.toSeq        \n",
    "        val rows: Seq[Seq[String]] = data.map { row =>\n",
    "          row.toSeq.map {cell =>\n",
    "            val str = cell match {\n",
    "              case null => \"null\"\n",
    "              case binary: Array[Byte] => binary.map(\"%02X\".format(_)).mkString(\"[\", \" \", \"]\")\n",
    "              case array: Array[_] => array.mkString(\"[\", \", \", \"]\")\n",
    "              case seq: Seq[_] => seq.mkString(\"[\", \", \", \"]\")\n",
    "              case _ => cell.toString\n",
    "            }\n",
    "            if (truncate > 0 && str.length > truncate) {\n",
    "              // do not show ellipses for strings shorter than 4 characters.\n",
    "              if (truncate < 4) str.substring(0, truncate)\n",
    "              else str.substring(0, truncate - 3) + \"...\"\n",
    "            } else {\n",
    "              str\n",
    "            }\n",
    "          }: Seq[String]\n",
    "        }\n",
    "    publish.html(s\"\"\" <table>\n",
    "                <tr>\n",
    "                 ${header.map(h => s\"<th>${escape(h)}</th>\").mkString}\n",
    "                </tr>\n",
    "                ${rows.map {row =>\n",
    "                  s\"<tr>${row.map{c => s\"<td>${escape(c)}</td>\" }.mkString}</tr>\"\n",
    "                }.mkString}\n",
    "            </table>\n",
    "        \"\"\")\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "\u001b[36mcamera\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: int, perim: string]\n",
       "\u001b[36mkeeper\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: int, name: string ... 1 more field]\n",
       "\u001b[36mvehicle\u001b[39m: \u001b[32mDataFrame\u001b[39m = [id: string, keeper: int]\n",
       "\u001b[36mimage\u001b[39m: \u001b[32mDataFrame\u001b[39m = [camera: int, whn: string ... 1 more field]\n",
       "\u001b[36mpermit\u001b[39m: \u001b[32mDataFrame\u001b[39m = [reg: string, sdate: string ... 1 more field]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "val camera = hiveCxt.table(\"sqlzoo.camera\")\n",
    "val keeper = hiveCxt.table(\"sqlzoo.keeper\")\n",
    "val vehicle = hiveCxt.table(\"sqlzoo.vehicle\")\n",
    "val image = hiveCxt.table(\"sqlzoo.image\")\n",
    "val permit = hiveCxt.table(\"sqlzoo.permit\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1.\n",
    "When creating a view in scott you must specify the schema name of the sources and the destination."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2.\n",
    "There are four types of permit. The most popular type means that this type has been issued the highest number of times. Find out the most popular type, together with the total number of permits issued."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>chargetype</th><th>cnt</th>\n",
       "                </tr>\n",
       "                <tr><td>Daily</td><td>27</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(permit.groupBy(\"chargetype\")\n",
    " .agg(count(\"reg\").alias(\"cnt\"))\n",
    " .orderBy(col(\"cnt\").desc)\n",
    " .limit(1)\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3.\n",
    "For each of the vehicles caught by camera 19 - show the registration, the earliest time at camera 19 and the time and camera at which it left the zone."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>reg</th><th>earliest</th><th>next</th><th>camera</th>\n",
       "                </tr>\n",
       "                <tr><td>SO 02 CSP</td><td>2007-02-25 07:51:10.0</td><td>2007-02-25 07:55:11.0</td><td>18</td></tr><tr><td>SO 02 DSP</td><td>2007-02-25 16:31:01.0</td><td>2007-02-25 17:42:41.0</td><td>19</td></tr><tr><td>SO 02 JSP</td><td>2007-02-25 17:14:11.0</td><td>2007-02-25 17:17:03.0</td><td>3</td></tr><tr><td>SO 02 TSP</td><td>2007-02-25 07:23:00.0</td><td>2007-02-25 07:26:31.0</td><td>19</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "// registrations showing at camera 19, the earliest time\n",
    "(image.filter(image(\"camera\") === 19)\n",
    " .join(vehicle, (image(\"reg\") === vehicle(\"id\")))\n",
    " .groupBy(\"reg\")\n",
    " .agg(min(\"whn\").alias(\"earliest\"))\n",
    " .join(image, \"reg\", joinType=\"left\")\n",
    " .filter(col(\"earliest\") < col(\"whn\"))\n",
    " .groupBy(\"reg\", \"earliest\")\n",
    " .agg(min(\"whn\").alias(\"next\"))\n",
    " .withColumnRenamed(\"reg\", \"reg1\")\n",
    " .join(image, ((col(\"reg1\") === image(\"reg\")) && \n",
    "               (col(\"next\") === image(\"whn\"))))\n",
    " .select(\"reg\", \"earliest\", \"next\", \"camera\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4.\n",
    "For all 19 cameras - show the position as IN, OUT or INTERNAL and the busiest hour for that camera."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>camera</th><th>type</th><th>hr</th><th>max(n)</th>\n",
       "                </tr>\n",
       "                <tr><td>1</td><td>IN</td><td>6</td><td>1</td></tr><tr><td>2</td><td>IN</td><td>7</td><td>1</td></tr><tr><td>3</td><td>IN</td><td>18</td><td>2</td></tr><tr><td>3</td><td>IN</td><td>17</td><td>3</td></tr><tr><td>5</td><td>IN</td><td>7</td><td>1</td></tr><tr><td>8</td><td>IN</td><td>7</td><td>2</td></tr><tr><td>9</td><td>OUT</td><td>18</td><td>1</td></tr><tr><td>9</td><td>OUT</td><td>6</td><td>1</td></tr><tr><td>9</td><td>OUT</td><td>16</td><td>6</td></tr><tr><td>10</td><td>OUT</td><td>18</td><td>2</td></tr><tr><td>10</td><td>OUT</td><td>5</td><td>1</td></tr><tr><td>10</td><td>OUT</td><td>7</td><td>1</td></tr><tr><td>11</td><td>OUT</td><td>7</td><td>1</td></tr><tr><td>11</td><td>OUT</td><td>18</td><td>2</td></tr><tr><td>12</td><td>OUT</td><td>17</td><td>1</td></tr><tr><td>12</td><td>OUT</td><td>18</td><td>2</td></tr><tr><td>12</td><td>OUT</td><td>7</td><td>1</td></tr><tr><td>15</td><td>OUT</td><td>18</td><td>1</td></tr><tr><td>16</td><td>OUT</td><td>7</td><td>1</td></tr><tr><td>17</td><td>INTERNAL</td><td>6</td><td>2</td></tr><tr><td>17</td><td>INTERNAL</td><td>7</td><td>1</td></tr><tr><td>18</td><td>INTERNAL</td><td>6</td><td>1</td></tr><tr><td>18</td><td>INTERNAL</td><td>7</td><td>3</td></tr><tr><td>18</td><td>INTERNAL</td><td>16</td><td>3</td></tr><tr><td>18</td><td>INTERNAL</td><td>17</td><td>1</td></tr><tr><td>19</td><td>INTERNAL</td><td>17</td><td>2</td></tr><tr><td>19</td><td>INTERNAL</td><td>16</td><td>1</td></tr><tr><td>19</td><td>INTERNAL</td><td>7</td><td>4</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(camera\n",
    " .withColumn(\"type\", when(camera(\"perim\").isNull, \"INTERNAL\")\n",
    "             .otherwise(camera(\"perim\")))\n",
    " .join(image.withColumn(\"hr\", hour(image(\"whn\"))), \n",
    "       (camera(\"id\") === image(\"camera\")))\n",
    " .groupBy(\"camera\", \"type\", \"hr\")\n",
    " .agg(count(\"id\").alias(\"n\"))\n",
    " .groupBy(\"camera\", \"type\", \"hr\")\n",
    " .max(\"n\")\n",
    " .orderBy(\"camera\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5.\n",
    "Anomalous daily permits. Daily permits should not be issued for non-charging days. Find a way to represent charging days. Identify the anomalous daily permits."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>reg</th><th>sdate</th><th>chargetype</th>\n",
       "                </tr>\n",
       "                <tr><td>SO 02 ATP</td><td>2007-01-21 00:00:00.0</td><td>Daily</td></tr><tr><td>SO 02 BTP</td><td>2007-02-03 00:00:00.0</td><td>Daily</td></tr><tr><td>SO 02 BTP</td><td>2007-02-04 00:00:00.0</td><td>Daily</td></tr><tr><td>SO 02 CTP</td><td>2007-01-21 00:00:00.0</td><td>Daily</td></tr><tr><td>SO 02 FTP</td><td>2007-02-25 00:00:00.0</td><td>Daily</td></tr><tr><td>SO 02 HTP</td><td>2006-01-21 00:00:00.0</td><td>Daily</td></tr><tr><td>SO 02 HTP</td><td>2006-01-22 00:00:00.0</td><td>Daily</td></tr><tr><td>SO 02 JTP</td><td>2007-01-21 00:00:00.0</td><td>Daily</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "(permit.filter((dayofweek(permit(\"sdate\")).isin(List(1, 7): _*)) &&\n",
    "               (permit(\"chargetype\") === \"Daily\"))\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 6.\n",
    "Issuing fines: Vehicles using the zone during the charge period, on charging days must be issued with fine notices unless they have a permit covering that day. List the name and address of such culprits, give the camera and the date and time of the first offence."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       " <table>\n",
       "                <tr>\n",
       "                 <th>reg</th><th>name</th><th>address</th><th>first_offence</th><th>camera</th>\n",
       "                </tr>\n",
       "                <tr><td>SO 02 GSP</td><td>Incongruous, Ingrid</td><td>Irresolution Pl.</td><td>2007-02-25 07:10:00.0</td><td>5</td></tr><tr><td>SO 02 ASP</td><td>Ambiguous, Arthur</td><td>Absorption Ave.</td><td>2007-02-25 06:10:13.0</td><td>1</td></tr><tr><td>SO 02 ISP</td><td>Incongruous, Ingrid</td><td>Irresolution Pl.</td><td>2007-02-25 16:58:01.0</td><td>9</td></tr><tr><td>SO 02 JSP</td><td>Inconspicuous, Iain</td><td>Interception Rd.</td><td>2007-02-25 17:07:00.0</td><td>3</td></tr><tr><td>SO 02 DSP</td><td>Strenuous, Sam</td><td>Surjection Street</td><td>2007-02-25 16:29:11.0</td><td>18</td></tr><tr><td>SO 02 CSP</td><td>Ambiguous, Arthur</td><td>Absorption Ave.</td><td>2007-02-25 06:57:31.0</td><td>17</td></tr><tr><td>SO 02 HSP</td><td>Assiduous, Annie</td><td>Attribution Alley</td><td>2007-02-25 16:45:04.0</td><td>9</td></tr>\n",
       "            </table>\n",
       "        "
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "\u001b[36mf\u001b[39m: \u001b[32mDataset\u001b[39m[\u001b[32mRow\u001b[39m] = [keeper: int, reg: string ... 8 more fields]\n",
       "\u001b[36ma\u001b[39m: \u001b[32mDataFrame\u001b[39m = [reg: string, name: string ... 2 more fields]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "// vehicles with permits\n",
    "val f = (vehicle.join(permit, (vehicle(\"id\") === permit(\"reg\")), joinType=\"left\")\n",
    "     .withColumn(\"sdate\", to_timestamp(col(\"sdate\")))\n",
    "     .withColumn(\"edate\", when(\n",
    "        col(\"chargetype\") === \"Daily\", col(\"sdate\") + expr(\"interval 1 day\")).when(\n",
    "        col(\"chargetype\") === \"Weekly\", col(\"sdate\") + expr(\"interval 1 week\")).when(\n",
    "        col(\"chargetype\") === \"Monthly\", col(\"sdate\") + expr(\"interval 1 month\")).when(\n",
    "        col(\"chargetype\") === \"Annual\", col(\"sdate\") + expr(\"interval 1 year\")))\n",
    "     .join(image, \"reg\", joinType=\"right\")\n",
    "     .join(keeper.withColumnRenamed(\"id\", \"keeper\"), \"keeper\")\n",
    "     .filter((col(\"whn\")<col(\"sdate\")) || (col(\"whn\")>col(\"edate\"))))\n",
    "\n",
    "val a = (f.groupBy(\"reg\", \"name\", \"address\")\n",
    "     .agg(min(\"whn\").alias(\"first_offence\")))\n",
    "\n",
    "(a.alias(\"a\")\n",
    " .join(f.alias(\"f\").select(\"reg\", \"whn\", \"camera\"), \n",
    "        ((a(\"reg\") === f(\"reg\")) && (a(\"first_offence\") === f(\"whn\"))))\n",
    " .select(\"a.reg\", \"name\", \"address\", \"first_offence\", \"camera\")\n",
    " .showHTML())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".sc",
   "mimetype": "text/x-scala",
   "name": "scala",
   "nbconvert_exporter": "script",
   "version": "2.12.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
